package com.deodar.kettle.platform.monitor.executor;

import com.deodar.common.utils.IDUtil;
import com.deodar.kettle.platform.common.state.TaskStatusState;
import com.deodar.kettle.platform.common.util.KettleConstant;
import com.deodar.kettle.platform.common.util.KettleImagesUtil;
import com.deodar.kettle.platform.common.util.KettleSpringContext;
import com.deodar.kettle.platform.database.domain.RnExecutionLog;
import com.deodar.kettle.platform.database.domain.RnStepStatus;
import com.deodar.kettle.platform.database.service.IRnExecutionLogService;
import com.deodar.kettle.platform.database.service.impl.RnExecutionLogServiceImpl;
import com.deodar.kettle.platform.monitor.service.SlaveService;
import com.deodar.kettle.platform.monitor.util.KettleConfigUtil;
import com.deodar.kettle.platform.monitor.util.TaskManageUtil;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.pentaho.di.cluster.SlaveServer;
import org.pentaho.di.trans.Trans;
import org.pentaho.di.trans.TransExecutionConfiguration;
import org.pentaho.di.trans.TransMeta;
import org.pentaho.di.trans.step.StepStatus;
import org.pentaho.di.www.SlaveServerTransStatus;

import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.Callable;

/**
 * @author: Administrator
 * @Date: 2020/04/14 19:25
 * @Description:
 * @version:1.0.0
 */
@Slf4j
public class TransExecutor implements Callable<String> {

    TransExecutionConfiguration executionConfiguration;
    TransMeta transMeta;

    Trans trans;

    boolean isClickStop=false;

    String carteObjectId;

    String logId;

    String key;

    RnExecutionLog executionLog;

    long errCount; //错误数

    public TransExecutor(TransExecutionConfiguration executionConfiguration, TransMeta transMeta, String logId) {
        this.executionConfiguration = executionConfiguration;
        this.transMeta = transMeta;
        this.logId = logId;
    }

    @Override
    public String call() throws Exception {
        run();
        return null;
    }


    @SneakyThrows
    private void run(){
        IRnExecutionLogService rnExecutionLogService = KettleSpringContext.getBean(IRnExecutionLogService.class);
        boolean insertFlag = false;
        if(logId == null || "".equals(logId)){
            logId = IDUtil.getSnowflakeId();
            log.info("trans logId:"+logId);
            executionLog = new RnExecutionLog();
            executionLog.setId(logId);
            insertFlag = true;
        }else {
            executionLog = rnExecutionLogService.selectRnExecutionLogById(logId);
        }

        key = KettleConstant.KEY_PREFIX+logId;
        try {
            TaskManageUtil.getInstance().add(key,this);


            int status = TaskStatusState.STATE_RUNNING.getStatus();
            executionLog.setStartTime(new Date());
            executionLog.setStatus(status);
            executionLog.setTaskName(transMeta.getName());
            executionLog.setTaskType(1);



            trans = new Trans( transMeta );
            if(insertFlag){
                rnExecutionLogService.insertRnExecutionLog(executionLog);
            }else {
                rnExecutionLogService.updateRnExecutionLog(executionLog);
            }
            SlaveService slaveService = KettleSpringContext.getBean(SlaveService.class);
            SlaveServer slaveServer = slaveService.getMinLoad();
            executionLog.setExecutionMethod("远程："+slaveServer.getHostname());
            rnExecutionLogService.updateRnExecutionLog(executionLog);

            executionConfiguration.setRemoteServer(slaveServer);
            carteObjectId = Trans.sendToSlaveServer( transMeta, executionConfiguration, null, null);

            log.info("trans carteObjectId:{}",carteObjectId);
            SlaveServer remoteSlaveServer = executionConfiguration.getRemoteServer();


            boolean running = true;
            while(running) {
                SlaveServerTransStatus transStatus = remoteSlaveServer.getTransStatus(transMeta.getName(), carteObjectId, 0);
                running = transStatus.isRunning();
                if(!running) errCount = transStatus.getResult().getNrErrors();
                try {

                    getRnStepStatus(executionLog.getId());
                } catch (Exception e) {
                    e.printStackTrace();
                }
                Thread.sleep(3000);
            }
            if(errCount > 0){
                status= TaskStatusState.STATE_FAIL.getStatus();
            }else {
                status = TaskStatusState.STATE_SUCCESS.getStatus();
            }
            executionLog.setStatus(status);
            executionLog.setExecutionLog(getExecutionLog());
        }catch (Exception e) {
            log.error(ExceptionUtils.getStackTrace(e));
            executionLog.setStatus(TaskStatusState.STATE_FAIL.getStatus());
            executionLog.setRemark(ExceptionUtils.getStackTrace(e));
        }finally {

            if(executionLog.getEndTime() == null) {
                executionLog.setEndTime(new Date());
            }
            String imgPath = KettleImagesUtil.getImagePath(executionConfiguration.getRemoteServer(), "trans", executionLog.getTaskName(),
                    carteObjectId, KettleConfigUtil.getKettleImagePath()+"/trans");
            executionLog.setImagePath(imgPath);
            rnExecutionLogService.updateRnExecutionLog(executionLog);
            try {
                //清理执行的中间信息

                getRnStepStatus(executionLog.getId());
            } catch (Exception e) {
                log.error(ExceptionUtils.getStackTrace(e));
            }
            TaskManageUtil.getInstance().remove(key);
            log.info("[{}]转换执行完成",executionLog.getTaskName());
        }

    }

    private void  getRnStepStatus(String taskId) throws Exception{
        SlaveServer remoteSlaveServer = executionConfiguration.getRemoteServer();
        SlaveServerTransStatus transStatus = remoteSlaveServer.getTransStatus(transMeta.getName(), carteObjectId, 0);
        List<StepStatus> stepStatusList = transStatus.getStepStatusList();

        if(stepStatusList !=null && stepStatusList.size() >0){
            List<RnStepStatus> rnStepStatusList = new ArrayList<>();
            IRnExecutionLogService rnExecutionLogService = KettleSpringContext.getBean(RnExecutionLogServiceImpl.class);
            SlaveService slaveService = KettleSpringContext.getBean(SlaveService.class);
            rnExecutionLogService.deleteStepStatus(taskId);

            for(StepStatus stepStatus :stepStatusList){
                RnStepStatus rnStepStatus = slaveService.getRnStepStatus(stepStatus);
                rnStepStatus.setTaskId(taskId);
                rnStepStatus.setId(IDUtil.getSnowflakeId());
                rnStepStatusList.add(rnStepStatus);

            }

            rnExecutionLogService.batchInsertStepStatus(rnStepStatusList);
        }
    }

    /**
     * 停止转换
     */
    public void stop() {
        if(trans!=null){
            trans.stopAll();
        }
    }

    public void pause() {
        if(!trans.isPaused())
            trans.pauseRunning();
        else
            trans.resumeRunning();
    }

    public void setClickStop(boolean clickStop) {
        this.isClickStop = clickStop;
    }

    public String getExecutionLog() throws Exception {

        SlaveServer remoteSlaveServer = executionConfiguration.getRemoteServer();
        SlaveServerTransStatus transStatus = remoteSlaveServer.getTransStatus(transMeta.getName(), carteObjectId, 0);
        return transStatus.getLoggingString();

    }
}
